package com.jourwon.spring.boot.listener;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import javax.annotation.Resource;
import java.util.Optional;

/**
 * 自定义消息监听器抽象类
 *
 * @author JourWon
 * @date 2022/3/22
 */
public abstract class AbstractCustomMessageListener {

    private static int NUMBER_OF_LISTENERS = 0;

    @Resource
    private KafkaProperties kafkaProperties;

    /**
     * 创建 Kafka 监听器节点
     *
     * @param name
     * @param topic
     * @return
     */
    public abstract KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic);

    protected MethodKafkaListenerEndpoint<String, String> createDefaultMethodKafkaListenerEndpoint(String name, String topic) {
        MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>();
        kafkaListenerEndpoint.setId(getConsumerId(name));
        kafkaListenerEndpoint.setGroupId(kafkaProperties.getConsumer().getGroupId());
        kafkaListenerEndpoint.setAutoStartup(true);
        kafkaListenerEndpoint.setTopics(topic);
        kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        return kafkaListenerEndpoint;
    }

    private String getConsumerId(String name) {
        if (isBlank(name)) {
            return AbstractCustomMessageListener.class.getCanonicalName() + "#" + NUMBER_OF_LISTENERS++;
        } else {
            return name;
        }
    }

    private boolean isBlank(String string) {
        return Optional.ofNullable(string).map(String::isEmpty).orElse(true);
    }

}
